{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "d00fc083",
   "metadata": {
    "papermill": {
     "duration": 0.016307,
     "end_time": "2022-01-10T17:05:19.160432",
     "exception": false,
     "start_time": "2022-01-10T17:05:19.144125",
     "status": "completed"
    },
    "tags": []
   },
   "source": [
    "# Input MQTT"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "645dc2c4",
   "metadata": {
    "papermill": {
     "duration": 0.015642,
     "end_time": "2022-01-10T17:05:19.193671",
     "exception": false,
     "start_time": "2022-01-10T17:05:19.178029",
     "status": "completed"
    },
    "tags": []
   },
   "source": [
    "Reads from MQTT topics in batches or streams. Per default stores messages in buffer (in-memory, not persistent) and returns messages as JSON on HTTP endpoint (which empties the buffer). If forward_url is set messages are pushed to downstream endpoint via POST and on success buffer is emptied.\n",
    "\n",
    "JSON format:\n",
    "[{\"mqtt_receive_timestamp\": .., \"mqtt_receive_topic\": .., mqtt_payload: {..}},...]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "166a39dd-a553-4b2b-be0a-0e6e1fdab6fe",
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "#os.environ['create_image']='True'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "5ebf4468-2a07-4938-a090-704770bdb762",
   "metadata": {
    "scrolled": true,
    "tags": []
   },
   "outputs": [],
   "source": [
    "if bool(os.environ.get('create_image',False)):\n",
    "    docker_file=\"\"\"\n",
    "    FROM registry.access.redhat.com/ubi8/python-39\n",
    "    RUN pip install ipython nbformat paho-mqtt flask\n",
    "    ADD input-mqtt.ipynb /\n",
    "    ENTRYPOINT [\"ipython\",\"/input-mqtt.ipynb\",\"> /tmp/component.log\",\"2> /tmp/component.err\"]\n",
    "    \"\"\"\n",
    "    with open(\"Dockerfile\", \"w\") as text_file:\n",
    "        text_file.write(docker_file)\n",
    "\n",
    "    !docker build -t claimed-input-mqtt:0.2 .\n",
    "    !docker tag claimed-input-mqtt:0.2 romeokienzler/claimed-input-mqtt:0.2\n",
    "    !docker push romeokienzler/claimed-input-mqtt:0.2\n",
    "else:\n",
    "    !pip install paho-mqtt flask\n",
    "    None"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "054a358d",
   "metadata": {
    "papermill": {
     "duration": 0.02608,
     "end_time": "2022-01-10T17:05:21.005692",
     "exception": false,
     "start_time": "2022-01-10T17:05:20.979612",
     "status": "completed"
    },
    "tags": []
   },
   "outputs": [],
   "source": [
    "import logging\n",
    "import os\n",
    "import re\n",
    "import sys\n",
    "import random\n",
    "from paho.mqtt import client as mqtt_client\n",
    "import threading\n",
    "from flask import Flask\n",
    "from flask import abort\n",
    "import queue\n",
    "import time\n",
    "import json\n",
    "from threading import Thread\n",
    "import requests "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "fcab9b87",
   "metadata": {
    "papermill": {
     "duration": 0.028704,
     "end_time": "2022-01-10T17:05:21.052573",
     "exception": false,
     "start_time": "2022-01-10T17:05:21.023869",
     "status": "completed"
    },
    "tags": []
   },
   "outputs": [],
   "source": [
    "# mqtt port\n",
    "port = int(os.environ.get('port',1883))\n",
    "\n",
    "# mqtt server\n",
    "broker = os.environ.get('broker')\n",
    "\n",
    "# mqtt topics, format 'topic1,...'\n",
    "topics = os.environ.get('topics', '')\n",
    "\n",
    "# mqtt user name\n",
    "username = os.environ.get('username', '')\n",
    "\n",
    "# mqtt password\n",
    "password = os.environ.get('password', '')\n",
    "\n",
    "# forward messages to forward_url via POST\n",
    "forward_url = os.environ.get('forward_url')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "22c51c11-7c47-4877-92a3-88e071506012",
   "metadata": {
    "papermill": {
     "duration": 0.027696,
     "end_time": "2022-01-10T17:05:21.101263",
     "exception": false,
     "start_time": "2022-01-10T17:05:21.073567",
     "status": "completed"
    },
    "tags": []
   },
   "outputs": [],
   "source": [
    "parameters = list(\n",
    "    map(lambda s: re.sub('$', '\"', s),\n",
    "        map(\n",
    "            lambda s: s.replace('=', '=\"'),\n",
    "            filter(\n",
    "                lambda s: s.find('=') > -1 and bool(re.match(r'[A-Za-z0-9_]*=[.\\/A-Za-z0-9]*', s)),\n",
    "                sys.argv\n",
    "            )\n",
    "    )))\n",
    "\n",
    "for parameter in parameters:\n",
    "    logging.warning('Parameter: ' + parameter)\n",
    "    exec(parameter)\n",
    "    \n",
    "port = int(port)\n",
    "topics = [(topic.strip(),0) for topic in topics.split(',')] if topics != '' else []"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "484d5974-1a9e-49d7-bd95-576372f526d5",
   "metadata": {},
   "outputs": [],
   "source": [
    "def connect_mqtt():\n",
    "    def on_connect(client, userdata, flags, rc):\n",
    "        if rc == 0:\n",
    "            print(\"Connected to MQTT Broker!\")\n",
    "        else:\n",
    "            print(\"Failed to connect, return code %d\\n\", rc)\n",
    "    # Set Connecting Client ID\n",
    "    client = mqtt_client.Client(f'python-mqtt-{random.randint(0, 1000)}')\n",
    "    client.username_pw_set(username, password)\n",
    "    client.on_connect = on_connect\n",
    "    client.connect(broker, port)\n",
    "    return client"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "f4eb2eb7-2d16-4e29-b4db-9d68f3652c36",
   "metadata": {},
   "outputs": [],
   "source": [
    "client = connect_mqtt()\n",
    "q = queue.Queue()\n",
    "client.subscribe(topics)\n",
    "#client.on_message = lambda client, userdata, msg: q.put(()),json.dumps(msg.payload.decode())))\n",
    "client.on_message = lambda client, userdata, msg: q.put('{\"mqtt_receive_timestamp\": '+\n",
    "                                                        str(int(round(time.time() * 1000)))+\n",
    "                                                                  ', \"mqtt_receive_topic\":  \"'+\n",
    "                                                                  msg.topic+\n",
    "                                                                  '\", \"mqtt_payload\": '+ \n",
    "                                                                  msg.payload.decode()+\n",
    "                                                        '}'\n",
    "                                                                 )\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "014b286c-66b0-4177-8a4b-52e957047cab",
   "metadata": {
    "scrolled": true,
    "tags": []
   },
   "outputs": [],
   "source": [
    "x = threading.Thread(target=client.loop_forever)\n",
    "x.start()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "49bbbfd4-ec55-4cba-86fa-dd2154c1d2a9",
   "metadata": {},
   "outputs": [],
   "source": [
    "def queue_as_json():\n",
    "    return_list = []\n",
    "    try:\n",
    "        while True:\n",
    "            message = q.get(block=False)\n",
    "            print(message)\n",
    "            return_list.append(message)\n",
    "    except queue.Empty:\n",
    "        print('empty queue!')\n",
    "    return \"[\"+(\",\".join(return_list))+\"]\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "21aec07d-2ed0-4cf6-b0b6-8c2eea23f061",
   "metadata": {
    "scrolled": true,
    "tags": []
   },
   "outputs": [],
   "source": [
    "def push_downstream(forward_url,queue_as_json):\n",
    "    while True:\n",
    "        message = queue_as_json()\n",
    "        if len(message)>2:\n",
    "            requests.post(forward_url, data=message)\n",
    "        time.sleep(1)\n",
    "\n",
    "\n",
    "if forward_url is not None:\n",
    "    new_thread = Thread(target=push_downstream,args=(forward_url,queue_as_json))\n",
    "    new_thread.start()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "bc3db22f-b7a7-4436-a79f-5159a42993df",
   "metadata": {
    "scrolled": true,
    "tags": []
   },
   "outputs": [],
   "source": [
    "app = Flask(__name__)\n",
    "\n",
    "@app.route('/')\n",
    "def index():\n",
    "    if forward_url is None:\n",
    "        return queue_as_json()\n",
    "    else:\n",
    "        return 'MQTT Service running'\n",
    "\n",
    "\n",
    "app.run(host='0.0.0.0', port=8080)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.9.6"
  },
  "papermill": {
   "default_parameters": {},
   "duration": 41.725932,
   "end_time": "2022-01-10T17:05:59.665500",
   "environment_variables": {},
   "exception": null,
   "input_path": "/home/romeokienzler/gitco/claimed/component-library/input/input-url.ipynb",
   "output_path": "/home/romeokienzler/gitco/claimed/component-library/input/input-url.ipynb",
   "parameters": {},
   "start_time": "2022-01-10T17:05:17.939568",
   "version": "2.3.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
